/*** Eclipse Class Decompiler plugin, copyright (c) 2012 Chao Chen (cnfree2000@hotmail.com) ***/
package com.sankuai.xm.kafka.client.factory;

import com.sankuai.xm.kafka.client.IProducerProcessor;
import com.sankuai.xm.kafka.client.exception.ProducerRuntimeException;
import com.sankuai.xm.kafka.client.utils.FutureCallback;
import com.sankuai.xm.kafka.client.utils.StackTraceUtil;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultProducerProcessor<K, V>
  implements IProducerProcessor<K, V>
{
  private static final Logger log = LoggerFactory.getLogger(DefaultProducerProcessor.class);
  private Producer<K, V> producer;
  private String topic;

  public DefaultProducerProcessor(Producer<K, V> producer, String topic)
  {
    this.producer = producer;
    this.topic = topic;
  }

  public Producer<K, V> getProducer() {
    return this.producer;
  }

  public void sendAsyncMessage(V msg, K partationField, FutureCallback futureCallback)
    throws Exception
  {
    if (msg == null) {
      log.error("xm-kafka-client send msg is null.");
      throw new ProducerRuntimeException("xm-kafka-client send msg is null.");
    }
    try {
      KeyedMessage data = new KeyedMessage(this.topic, null, partationField, msg);
      getProducer().send(data);
      futureCallback.onSuccess(msg);
    } catch (Exception e) {
      log.error("xm-kafka-client, send.async occur exception = {}.", StackTraceUtil.getStackTrace(e));
      futureCallback.onFailure(msg, e);
    }
  }

  public void sendAsyncMessage(V msg, FutureCallback futureCallback)
    throws Exception
  {
    if (msg == null) {
      log.error("xm-kafka-client send msg is null.");
      throw new ProducerRuntimeException("xm-kafka-client send msg is null.");
    }
    try {
      KeyedMessage data = new KeyedMessage(this.topic, null, msg.toString(), msg);
      getProducer().send(data);
      futureCallback.onSuccess(msg);
    } catch (Exception e) {
      log.error("xm-kafka-client, send.async occur exception = {}.", StackTraceUtil.getStackTrace(e));
      futureCallback.onFailure(msg, e);
    }
  }

  public void close() throws Exception
  {
    getProducer().close();
  }
}